{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nest_asyncio\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.llms.ollama import Ollama\n",
    "from llama_index.embeddings.huggingface import HuggingFaceEmbedding\n",
    "from llama_index.core.settings import Settings\n",
    "\n",
    "llm = Ollama(model=\"llama3.2\")\n",
    "embed_model = HuggingFaceEmbedding(model_name=\"BAAI/bge-small-en-v1.5\")\n",
    "\n",
    "Settings.llm = llm\n",
    "Settings.embed_model = embed_model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.workflow import Event\n",
    "from llama_index.core.schema import NodeWithScore\n",
    "\n",
    "\n",
    "class RetrieverEvent(Event):\n",
    "    \"\"\"Result of running retrieval\"\"\"\n",
    "\n",
    "    nodes: list[NodeWithScore]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core import SimpleDirectoryReader, VectorStoreIndex\n",
    "from llama_index.core.response_synthesizers import CompactAndRefine\n",
    "from llama_index.core.workflow import (\n",
    "    Context,\n",
    "    Workflow,\n",
    "    StartEvent,\n",
    "    StopEvent,\n",
    "    step,\n",
    ")\n",
    "\n",
    "class RAGWorkflow(Workflow):\n",
    "    @step\n",
    "    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:\n",
    "        \"\"\"Entry point to ingest a document, triggered by a StartEvent with `dirname`.\"\"\"\n",
    "        dirname = ev.get(\"dirname\")\n",
    "        if not dirname:\n",
    "            return None\n",
    "\n",
    "        documents = SimpleDirectoryReader(dirname).load_data()\n",
    "        index = VectorStoreIndex.from_documents(\n",
    "            documents=documents,\n",
    "        )\n",
    "        return StopEvent(result=index)\n",
    "\n",
    "    @step\n",
    "    async def retrieve(\n",
    "        self, ctx: Context, ev: StartEvent\n",
    "    ) -> RetrieverEvent | None:\n",
    "        \"Entry point for RAG, triggered by a StartEvent with `query`.\"\n",
    "        query = ev.get(\"query\")\n",
    "        index = ev.get(\"index\")\n",
    "\n",
    "        if not query:\n",
    "            return None\n",
    "\n",
    "        print(f\"Query the database with: {query}\")\n",
    "\n",
    "        # store the query in the global context\n",
    "        await ctx.set(\"query\", query)\n",
    "\n",
    "        # get the index from the global context\n",
    "        if index is None:\n",
    "            print(\"Index is empty, load some documents before querying!\")\n",
    "            return None\n",
    "\n",
    "        retriever = index.as_retriever(similarity_top_k=2)\n",
    "        nodes = await retriever.aretrieve(query)\n",
    "        print(f\"Retrieved {len(nodes)} nodes.\")\n",
    "        return RetrieverEvent(nodes=nodes)\n",
    "\n",
    "    @step\n",
    "    async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:\n",
    "        \"\"\"Return a streaming response using reranked nodes.\"\"\"\n",
    "        # llm = OpenAI(model=\"gpt-4o-mini\")\n",
    "        # summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)\n",
    "        summarizer = CompactAndRefine(streaming=True, verbose=True)\n",
    "        query = await ctx.get(\"query\", default=None)\n",
    "\n",
    "        response = await summarizer.asynthesize(query, nodes=ev.nodes)\n",
    "        return StopEvent(result=response)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The first entrypoint is ingestion"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "w = RAGWorkflow()\n",
    "\n",
    "# Ingest the documents\n",
    "index = await w.run(dirname=\"data\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The second entry point is retrieval"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Query the database with: How was DeepSeekR1 trained?\n",
      "Retrieved 2 nodes.\n",
      "DeepSeek-R1 was trained using multi-stage training and cold-start data before reinforcement learning (RL). This approach incorporates a rule-based reward system that uses accuracy rewards to evaluate response correctness and format rewards to enforce thinking process tagging. The model begins with a straightforward template guiding it to produce a reasoning process followed by the final answer, while intentionally limiting constraints to avoid content-specific biases."
     ]
    }
   ],
   "source": [
    "# Run a query\n",
    "result = await w.run(query=\"How was DeepSeekR1 trained?\", index=index)\n",
    "async for chunk in result.async_response_gen():\n",
    "    print(chunk, end=\"\", flush=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "env_llamaindex",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
